{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "id": "fb190a9d-901a-4447-94a7-6123d853876b",
   "metadata": {},
   "outputs": [],
   "source": [
    "from elasticsearch import Elasticsearch\n",
    "import os"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "id": "eca2ac5a-2bb5-4eed-9a3c-84c68ad1acb0",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "{'name': 'liuxgm.local', 'cluster_name': 'elasticsearch', 'cluster_uuid': 'n1BjmRPcR2GObT6ZMbJ9xA', 'version': {'number': '8.11.0', 'build_flavor': 'default', 'build_type': 'tar', 'build_hash': 'd9ec3fa628c7b0ba3d25692e277ba26814820b20', 'build_date': '2023-11-04T10:04:57.184859352Z', 'build_snapshot': False, 'lucene_version': '9.8.0', 'minimum_wire_compatibility_version': '7.17.0', 'minimum_index_compatibility_version': '7.0.0'}, 'tagline': 'You Know, for Search'}\n"
     ]
    }
   ],
   "source": [
    "elastic_user=os.getenv('ES_USER')\n",
    "elastic_password=os.getenv('ES_PASSWORD')\n",
    "elastic_endpoint=os.getenv(\"ES_ENDPOINT\")\n",
    " \n",
    "url = f\"https://{elastic_user}:{elastic_password}@{elastic_endpoint}:9200\"\n",
    "es = Elasticsearch(url, ca_certs = \"./http_ca.crt\", verify_certs = True)\n",
    " \n",
    "print(es.info())"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "a067fa1d-4fd5-4396-af7c-1d165976270d",
   "metadata": {},
   "source": [
    "# ingest pipeline definition"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "id": "d1fa1993-e103-416c-9dc0-69f7ac8016ba",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "ObjectApiResponse({'acknowledged': True})"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "PIPELINE_ID=\"vectorize_books_elser\"\n",
    "\n",
    "es.ingest.put_pipeline(id=PIPELINE_ID, processors=[{\n",
    "     \"foreach\": {\n",
    "         \"field\": \"synopsis_passages\",\n",
    "         \"processor\": {\n",
    "           \"inference\": {\n",
    "             \"field_map\": {\n",
    "               \"_ingest._value.text\": \"text_field\"\n",
    "             },\n",
    "             \"model_id\": \".elser_model_2\",\n",
    "             \"target_field\": \"_ingest._value.vector\",\n",
    "             \"on_failure\": [\n",
    "               {\n",
    "                 \"append\": {\n",
    "                   \"field\": \"_source._ingest.inference_errors\",\n",
    "                   \"value\": [\n",
    "                     {\n",
    "                       \"message\": \"Processor 'inference' in pipeline 'ml-inference-title-vector' failed with message '{{ _ingest.on_failure_message }}'\",\n",
    "                       \"pipeline\": \"ml-inference-title-vector\",\n",
    "                       \"timestamp\": \"{{{ _ingest.timestamp }}}\"\n",
    "                     }\n",
    "                   ]\n",
    "                 }\n",
    "               }\n",
    "             ]\n",
    "           }\n",
    "         }\n",
    "       }\n",
    "}])"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "6363264c-a62f-4da1-8431-5cfed0c9b2f4",
   "metadata": {},
   "source": [
    "# Define the mapping"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "id": "a9c14be7-6dd7-4413-8c6c-eabb883b820b",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'books'})"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "mappings = {\n",
    "   \"properties\": {\n",
    "       \"title\": {\"type\": \"text\"},\n",
    "       \"published_date\": {\"type\": \"text\"},\n",
    "       \"synopsis\": {\"type\": \"text\"},\n",
    "       \"synopsis_passages\": {\n",
    "         \"type\": \"nested\",\n",
    "         \"properties\": {\n",
    "             \"vector\": {\n",
    "               \"properties\": {\n",
    "                 \"is_truncated\": {\n",
    "                   \"type\": \"boolean\"\n",
    "                 },\n",
    "                 \"model_id\": {\n",
    "                   \"type\": \"text\",\n",
    "                   \"fields\": {\n",
    "                     \"keyword\": {\n",
    "                       \"type\": \"keyword\",\n",
    "                       \"ignore_above\": 256\n",
    "                     }\n",
    "                   }\n",
    "                 },\n",
    "                 \"predicted_value\": {\n",
    "                   \"type\": \"sparse_vector\"\n",
    "                 }\n",
    "            }\n",
    "         }\n",
    "     }\n",
    "   }\n",
    "}\n",
    "}\n",
    "# Create the index (deleting any previously existing index)\n",
    "es.indices.delete(index=\"books\", ignore_unavailable=True)\n",
    "es.indices.create(index=\"books\", mappings=mappings)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "id": "0531c4d3-cffd-4e01-996c-e4626fd70991",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "length of books: 999\n"
     ]
    }
   ],
   "source": [
    "import json\n",
    "\n",
    "with open('book_summaries_1000_chunked.json') as f:\n",
    "   books = json.load(f)\n",
    "\n",
    "print(\"length of books: %d\" %(len(books)))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "id": "eef2e7a6-1c31-4bb4-bfb8-1dd7d451b3be",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "/var/folders/kw/pvmj1zgs44l8t32cs8blf6sc0000gn/T/ipykernel_5116/1258494397.py:12: DeprecationWarning: Passing transport options in the API method is deprecated. Use 'Elasticsearch.options()' instead.\n",
      "  for ok, info in streaming_bulk(\n"
     ]
    }
   ],
   "source": [
    "from elasticsearch.helpers import streaming_bulk\n",
    "count = 0\n",
    "def generate_actions(books):\n",
    " for book in books:\n",
    "   doc = {}\n",
    "   doc[\"_index\"] = \"books\"\n",
    "   doc[\"pipeline\"] = \"vectorize_books_elser\"\n",
    "   doc[\"_source\"] = book\n",
    "   yield doc\n",
    "\n",
    "\n",
    "for ok, info in streaming_bulk(\n",
    "    client=es, \n",
    "    index=\"books\", \n",
    "    actions=generate_actions(books),\n",
    "    max_retries=3, \n",
    "    request_timeout=60*3, \n",
    "    chunk_size=10):\n",
    " if not ok:\n",
    "   print(f\"Unable to index {info['index']['_id']}: {info['index']['error']}\")\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "0d4a1cdf-f39a-4591-8c08-8b987adb8c27",
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python_3.11",
   "language": "python",
   "name": "python_3.11"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.11.6"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
